/**
*@author tonychao 
*使用启发式规则设置executor 的cores 和memory，以及cores per task
*/

package huristic;

import java.util.ArrayList;
import java.util.Comparator;

import org.omg.CORBA.IRObject;

import others.Global;
import others.Global.SparkMode;


public class HuristicConf {
	int  RDDparallelism,maxParallelism;
	SparkMode mode;
	
	public static final ArrayList<Integer> cores=Global.CORES;
	public static final ArrayList<Integer> memorys=Global.MEMORYS_IN_MB;
	
	//集群信息已经在配置文件里了，所以只需要输入文件的大小和模式
	public HuristicConf(int inputFileSizeInMB,SparkMode mode){
		this.mode=mode;
		if (this.mode==null){
			this.mode= Global.DEFAULT_MODE;
		}
		setExecutorConf();//根据standalone 模式 或者 yarn 模式 配置集群信息
		navieRoundAdjustments(inputFileSizeInMB);//目前使用最简单的round整理模式
	}
	
	public ArrayList<Estimator> executorConfList= new ArrayList<>();
	
	@SuppressWarnings("rawtypes")
	Comparator c = new Comparator<StandAloneResourceEstimator>() {  
        @Override
        public int compare(StandAloneResourceEstimator o1, StandAloneResourceEstimator o2) {  
            // TODO Auto-generated method stub  
            if(o1.avalibaleParallelism < o2.avalibaleParallelism)  
                return 1;  
            //注意！！返回值必须是一对相反数，否则无效。jdk1.7以后就是这样。    
            else return -1;  
        } 
	};
	
	
	//使用启发式规则设置
	//executor.cores 
	//executor.memory
	//cores per task,返回的是一个列表的可行解 最多n_workers*cores*memory/128 步计算 1W步以内
	private void setExecutorConf(){
		int cores_per_task =1;	//为了保证并行性，直接设置为1

		int maxCores=Integer.MIN_VALUE;
		for (int i :cores){
			if (maxCores<i) maxCores=i;
		}
		int minCores=Integer.MAX_VALUE;
		for (int i :cores){
			if (minCores>i) minCores=i;
		}
		
		int MaxMemory=-1;
		for (int i :memorys){
			if (MaxMemory<i) MaxMemory=i;
		}
		
		//standalone 模式下的准确性能估算
		if (mode.equals(SparkMode.SPARK_STAND_ALONE)){
			//枚举可能的核心数
			for (int executor_cores=1;executor_cores<=maxCores;executor_cores++){
				this.executorConfList.add(new StandAloneResourceEstimator(executor_cores, 512, cores_per_task));
			}
			this.executorConfList.sort(c); 
			//比较并行性 初始条件下每个core仅有512M内存，这时并行性仅仅取决于cores的分配仅仅保留并行性最大的那一批参数
			for(int i =this.executorConfList.size()-1;i>=1;i--){
				if(this.executorConfList.get(i).avalibaleParallelism==0) System.err.println("HuristicConf 出现异常情况");
				if (this.executorConfList.get(i).avalibaleParallelism<this.executorConfList.get(i-1).avalibaleParallelism)  this.executorConfList.remove(i);
			}
			
			//可能的使得并行度最大的核心数下使用的最大内存（不应小于 512M）
			for (Estimator estimator :this.executorConfList){
				int previous_parallelism=0;
				for (int currentMemory=512; currentMemory<= MaxMemory;currentMemory+=128){ 
					estimator.conf(estimator.executor_cores, currentMemory, estimator.cores_per_task); //实验新的配置下的并行度
					if (estimator.avalibaleParallelism<previous_parallelism) {//内存的进一步扩张会使得情况恶化，回退一步，结束搜索
						estimator.conf(estimator.executor_cores, currentMemory-128, estimator.cores_per_task);
						break;
					}
					previous_parallelism= estimator.avalibaleParallelism;
				}
			}
			/*
			for(StandAloneResourceEstimator e:this.executorConf){
				System.out.println(e.availableCores);
			}
			*/
		}
		
		
		//yarn 模式下的近似启发式解
		else if (mode.equals(SparkMode.SPARK_ON_YARN)){
			//枚举可能的核心数,但是在yarn模式下，这一参数并不会在实际上在每个节点中开启超过一个executor
			//每个executor的并行度到时直接受限于硬件线程。
			for (int executor_cores=1;executor_cores<=maxCores;executor_cores++){
				//this.executorConf.add(new yarnResourceEstimator(executor_cores, 512, cores_per_task));
			}
			//每个executor应该分配一个较大的内存，因为一个节点只有一个executor，但是同样内存过大容易带不起来，yarn的调度机制暂时没有清晰的研究
			this.executorConfList.clear();
			int min= Integer.MAX_VALUE;
			for (int i=0;i<cores.size();i++){
				int tmp = (int)(memorys.get(i));
				if (tmp<min) min=tmp;
			}
			//executor.memory 设置为最小的那个vcore 可以获得的值
			int executorMemory=min;
			this.executorConfList.add(new yarnResourceEstimator(minCores, executorMemory, cores_per_task));
		}
		
		else{
			//异常处理
		}
	}
	
	
	
	//配置 RDD.parallelism :spark.default.parallelism :Others: total number of cores on all executor nodes or 2, whichever is larger
	private void navieRoundAdjustments(int inputFileSizeMB){
		if (inputFileSizeMB==-1) {
				this.RDDparallelism=-1;
				return;
		}
		//在得到输入文件大小的情况下进行优化，使得任务数成为资源的整数倍
		if (this.executorConfList.isEmpty()) System.err.println("HuristicConf 出错，没有找到可用的配置");
		this.maxParallelism=this.executorConfList.get(0).avalibaleParallelism;
		int number_of_tasks= (int) Math.ceil(inputFileSizeMB*1.0/Global.HDFS_BLOCK_SIZE_IN_MB);
		this.RDDparallelism=  (int) Math.ceil(number_of_tasks*1.0/maxParallelism)*maxParallelism;
	}
	/*
	 * 注意这种方法实际上和应用与数据量有关系，而且关系很大，像类似于PageRank一类的应用他的数据在实验集中往往很小，需要对任务类型和相应开销进行建模
	 * */
	
	/**
	 * 
	 * 直接输出可以用于调优的字符串
	 * */
	public ArrayList<String> toConfList(){
		ArrayList<String> tmp = new ArrayList<String>();

		tmp.add("\t--conf spark.executor.cores="+String.valueOf(this.executorConfList.get(0).executor_cores));
		tmp.add("\t--conf spark.executor.memory="+String.valueOf(this.executorConfList.get(0).executor_memory+"m"));
		tmp.add("\t--conf spark.task.cpus="+String.valueOf(this.executorConfList.get(0).cores_per_task));
		
		if(RDDparallelism!=-1) {
			tmp.add("\t--conf spark.default.parallelism="+String.valueOf(this.RDDparallelism));//如果用户没有输入给定的信息那么就没办法确定并行度了
		}
		else {
			//tmp.add("\t--conf spark.default.parallelism="+String.valueOf(this.executorConfList.get(0).executor_cores));//如果用户没有输入给定的信息那么就没办法确定并行度了
		}
		
		tmp.add("\t--conf spark.driver.cores="+String.valueOf(Global.CORE_MASTER/2));
		tmp.add("\t--conf spark.driver.memory="+String.valueOf(Global.MEMORY_MASRTER/3)+"m");
		

		return tmp;
	}
	
	
	//关于driver 的cores 和executors 的配置：在driver上尽可能第多用资源
	
	
}
